{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Deepstream Test4 description\n",
    "--------------------------------------------------------------------------------------\n",
    "This sample builds on top of the deepstream-test1 sample to demonstrate how to:\n",
    "\n",
    "* Use \"nvmsgconv\" and \"nvmsgbroker\" plugins in the pipeline.\n",
    "* Create NVDS_META_EVENT_MSG type of meta and attach to buffer.\n",
    "* Use NVDS_META_EVENT_MSG for different types of objects e.g. vehicle, person etc.\n",
    "* copy / free functions if meta data is extended through \"extMsg\" field have been moved out of the deepstream_test4 python app.\n",
    "These copy and free functions are available for using/extending inside bindschema.cpp as event_msg_meta_copy_func() and\n",
    "event_msg_meta_release_func() respectively.\n",
    "\n",
    "\"nvmsgconv\" plugin uses NVDS_META_EVENT_MSG type of metadata from the buffer\n",
    "and generates the \"DeepStream Schema\" payload in Json format. Static properties\n",
    "of schema are read from configuration file in the form of key-value pair.\n",
    "Check dstest4_msgconv_config.txt for reference. Generated payload is attached\n",
    "as NVDS_META_PAYLOAD type metadata to the buffer.\n",
    "\n",
    "\"nvmsgbroker\" plugin extracts NVDS_META_PAYLOAD type of metadata from the buffer\n",
    "and sends that payload to the server using protocol adaptor APIs.\n",
    "\n",
    "Generating custom metadata for different type of objects:\n",
    "In addition to common fields provided in NvDsEventMsgMeta structure, user can\n",
    "also create custom objects and attach to buffer as NVDS_META_EVENT_MSG metadata.\n",
    "To do that NvDsEventMsgMeta provides \"extMsg\" and \"extMsgSize\" fields. User can\n",
    "create custom structure, fill that structure and assign the pointer of that\n",
    "structure as \"extMsg\" and set the \"extMsgSize\" accordingly.\n",
    "If custom object contains fields that can't be simply mem copied then user should\n",
    "also provide/extend the functions to copy - event_msg_meta_copy_func() and free - event_msg_meta_release_func() those objects."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Prerequisites"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "   * **DeepStream SDK 6.4**\n",
    "\n",
    "To setup and install DeepStream 6.4, please follow the steps at https://developer.nvidia.com/deepstream-getting-started\n",
    "    \n",
    "   * **DeepStream Python Apps**\n",
    "\n",
    "To install DeepStream Python Apps, follow the instructions in this repo: https://github.com/NVIDIA-AI-IOT/deepstream_python_apps\n",
    "\n",
    "   * [**Jupyter Notebook**](https://jupyter.org/install)\n",
    "\n",
    "To install jupyter notebook, run the following command:\n",
    "\n",
    "```\n",
    "pip3 install notebook\n",
    "```  \n",
    "<br>\n",
    "* **Build dependencies with installation instructions:**\n",
    "\n",
    "&nbsp;&nbsp;&nbsp;&nbsp;__librdkafka__\n",
    "\n",
    "  Note that for using TLS/SSL security, make sure to build librdkafka with\n",
    "  SSL suport enabled by using the enable_ssl option while running\n",
    "  'configure', as shown below. Also note that these are the same steps as setup in [Deepstream Quickstart Guide](https://docs.nvidia.com/metropolis/deepstream/dev-guide/text/DS_Quickstart.html#id1). If already previously completed, please skip this step. \n",
    "```\n",
    "  git clone https://github.com/confluentinc/librdkafka.git\n",
    "  cd librdkafka\n",
    "  git checkout tags/v2.2.0\n",
    "  ./configure --enable-ssl\n",
    "  make\n",
    "  sudo make install\n",
    "  sudo cp /usr/local/lib/librdkafka* /opt/nvidia/deepstream/deepstream/lib/\n",
    "  sudo ldconfig\n",
    "```\n",
    "NOTE: To compile the sources, run make with \"sudo\" or root permission.\n",
    "\n",
    "&nbsp;&nbsp;&nbsp;&nbsp;__glib 2.0__\n",
    "   \n",
    "```\n",
    "  apt-get install libglib2.0 libglib2.0-dev \n",
    "```\n",
    "<br>\n",
    "  \n",
    "&nbsp;&nbsp;&nbsp;&nbsp;__jansson__\n",
    "    \n",
    "```\n",
    "  apt-get install  libjansson4  libjansson-dev\n",
    "``` \n",
    "<br>\n",
    "  \n",
    "&nbsp;&nbsp;&nbsp;&nbsp;__ssl__\n",
    "   \n",
    "```\n",
    "  apt-get install libssl-dev\n",
    " ```\n",
    "<br>\n",
    "\n",
    "* (Optional): Install & setup kafka broker on your machine & create topic(s). See instructions here: https://kafka.apache.org/quickstart (These are the same steps as deepstream-test4)\n",
    "\n",
    " 1. Install kafka server\n",
    " 2. In a terminal tab, navigate to where the kafka server is installed. Run\n",
    "```bin/zookeeper-server-start.sh config/zookeeper.properties```\n",
    " 3. Open another tab in terminal. Run ``bin/kafka-server-start.sh config/server.properties`` from the same directory where kafka is installed.\n",
    " 4. Open another tab in terminal. Run ``bin/kafka-topics.sh --create --topic topic --bootstrap-server localhost:9092``. You may need to delete an old topic (``./bin/kafka-topics.sh --delete --topic topic --bootstrap-server localhost:9092``) and recreate if the topic is already created. Again, run in same directory where kafka is installed."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Importing necessary libraries"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import sys\n",
    "\n",
    "sys.path.append('../')\n",
    "sys.path.append('../apps/')\n",
    "import gi\n",
    "\n",
    "gi.require_version('Gst', '1.0')\n",
    "from gi.repository import GLib, Gst\n",
    "import sys\n",
    "from optparse import OptionParser\n",
    "from common.is_aarch_64 import is_aarch64\n",
    "from common.bus_call import bus_call\n",
    "from common.utils import long_to_uint64\n",
    "import pyds"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Declaring class label ids and other meta data requirements"
   ]
  },
  {
   "cell_type": "raw",
   "metadata": {},
   "source": [
    "## Preparing the necessary arguments \n",
    "\n",
    "cfg_file = Path to adaptor config file\n",
    "\n",
    "input_file = Path to input x264 stream\n",
    "\n",
    "proto_lib = Absolute path to kafka proto file\n",
    "\n",
    "conn_str = Connection string of backend server. Optional if it is part of config file.\n",
    "\n",
    "topic = Name of message topic. Optional if it is part of connection string or config file.\n",
    "\n",
    "no_display = To disable display. Default False\n",
    "schema_type = Type of minimal schema. 0= Full. 1= Minimal. Default =0"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Example: \n",
    "cfg_file = \"../apps/deepstream-test4/cfg_kafka.txt\"\n",
    "input_file = \"/opt/nvidia/deepstream/deepstream/samples/streams/sample_720p.h264\"\n",
    "proto_lib = \"/opt/nvidia/deepstream/deepstream/lib/libnvds_kafka_proto.so\"\n",
    "conn_str = \"localhost;2181;testTopic\"\n",
    "topic = \"topic\"\n",
    "no_display = False\n",
    "schema_type = 0    "
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "MAX_DISPLAY_LEN = 64\n",
    "MAX_TIME_STAMP_LEN = 32\n",
    "PGIE_CLASS_ID_VEHICLE = 0\n",
    "PGIE_CLASS_ID_BICYCLE = 1\n",
    "PGIE_CLASS_ID_PERSON = 2\n",
    "PGIE_CLASS_ID_ROADSIGN = 3\n",
    "MUXER_OUTPUT_WIDTH = 1920\n",
    "MUXER_OUTPUT_HEIGHT = 1080\n",
    "MUXER_BATCH_TIMEOUT_USEC = 33000\n",
    "cfg_file = \"../apps/deepstream-test4/cfg_kafka.txt\"\n",
    "input_file = \"/opt/nvidia/deepstream/deepstream/samples/streams/sample_720p.h264\"\n",
    "proto_lib = \"/opt/nvidia/deepstream/deepstream/lib/libnvds_kafka_proto.so\"\n",
    "conn_str = \"localhost;2181;testTopic\" # make sure to change to your own connection string\n",
    "topic = \"topic\"\n",
    "no_display = False\n",
    "schema_type = 0    "
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Necessary config files and meta data class labels"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "PGIE_CONFIG_FILE = \"../apps/deepstream-test4/dstest4_pgie_config.txt\"\n",
    "MSCONV_CONFIG_FILE = \"../apps/deepstream-test4/dstest4_msgconv_config.txt\"\n",
    "pgie_classes_str = [\"Vehicle\", \"TwoWheeler\", \"Person\", \"Roadsign\"]"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Generating meta data "
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "def generate_vehicle_meta(data):\n",
    "    obj = pyds.NvDsVehicleObject.cast(data)\n",
    "    obj.type = \"sedan\"\n",
    "    obj.color = \"blue\"\n",
    "    obj.make = \"Bugatti\"\n",
    "    obj.model = \"M\"\n",
    "    obj.license = \"XX1234\"\n",
    "    obj.region = \"CA\"\n",
    "    return obj\n",
    "\n",
    "\n",
    "def generate_person_meta(data):\n",
    "    obj = pyds.NvDsPersonObject.cast(data)\n",
    "    obj.age = 45\n",
    "    obj.cap = \"none\"\n",
    "    obj.hair = \"black\"\n",
    "    obj.gender = \"male\"\n",
    "    obj.apparel = \"formal\"\n",
    "    return obj"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Generating event meta data"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "def generate_event_msg_meta(data, class_id):\n",
    "    meta = pyds.NvDsEventMsgMeta.cast(data)\n",
    "    meta.sensorId = 0\n",
    "    meta.placeId = 0\n",
    "    meta.moduleId = 0\n",
    "    meta.sensorStr = \"sensor-0\"\n",
    "    meta.ts = pyds.alloc_buffer(MAX_TIME_STAMP_LEN + 1)\n",
    "    pyds.generate_ts_rfc3339(meta.ts, MAX_TIME_STAMP_LEN)\n",
    "\n",
    "    # This demonstrates how to attach custom objects.\n",
    "    # Any custom object as per requirement can be generated and attached\n",
    "    # like NvDsVehicleObject / NvDsPersonObject. Then that object should\n",
    "    # be handled in payload generator library (nvmsgconv.cpp) accordingly.\n",
    "    if class_id == PGIE_CLASS_ID_VEHICLE:\n",
    "        meta.type = pyds.NvDsEventType.NVDS_EVENT_MOVING\n",
    "        meta.objType = pyds.NvDsObjectType.NVDS_OBJECT_TYPE_VEHICLE\n",
    "        meta.objClassId = PGIE_CLASS_ID_VEHICLE\n",
    "        obj = pyds.alloc_nvds_vehicle_object()\n",
    "        obj = generate_vehicle_meta(obj)\n",
    "        meta.extMsg = obj\n",
    "        meta.extMsgSize = sys.getsizeof(pyds.NvDsVehicleObject)\n",
    "    if class_id == PGIE_CLASS_ID_PERSON:\n",
    "        meta.type = pyds.NvDsEventType.NVDS_EVENT_ENTRY\n",
    "        meta.objType = pyds.NvDsObjectType.NVDS_OBJECT_TYPE_PERSON\n",
    "        meta.objClassId = PGIE_CLASS_ID_PERSON\n",
    "        obj = pyds.alloc_nvds_person_object()\n",
    "        obj = generate_person_meta(obj)\n",
    "        meta.extMsg = obj\n",
    "        meta.extMsgSize = sys.getsizeof(pyds.NvDsPersonObject)\n",
    "    return meta"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Adding a probe to get meta data"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## osd_sink_pad_buffer_probe  will extract metadata received on OSD sink pad\n",
    "## and update params for drawing rectangle, object information etc.\n",
    "\n",
    "## IMPORTANT NOTE:\n",
    "#### a) probe() callbacks are synchronous and thus holds the buffer\n",
    "####      (info.get_buffer()) from traversing the pipeline until user return.\n",
    "#### b) loops inside probe() callback could be costly in python.\n",
    "####   So users shall optimize according to their use-case."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "def osd_sink_pad_buffer_probe(pad, info, u_data):\n",
    "    frame_number = 0\n",
    "    # Intiallizing object counter with 0.\n",
    "    obj_counter = {\n",
    "        PGIE_CLASS_ID_VEHICLE: 0,\n",
    "        PGIE_CLASS_ID_PERSON: 0,\n",
    "        PGIE_CLASS_ID_BICYCLE: 0,\n",
    "        PGIE_CLASS_ID_ROADSIGN: 0\n",
    "    }\n",
    "    gst_buffer = info.get_buffer()\n",
    "    if not gst_buffer:\n",
    "        print(\"Unable to get GstBuffer \")\n",
    "        return\n",
    "\n",
    "    # Retrieve batch metadata from the gst_buffer\n",
    "    # Note that pyds.gst_buffer_get_nvds_batch_meta() expects the\n",
    "    # C address of gst_buffer as input, which is obtained with hash(gst_buffer)\n",
    "    batch_meta = pyds.gst_buffer_get_nvds_batch_meta(hash(gst_buffer))\n",
    "    if not batch_meta:\n",
    "        return Gst.PadProbeReturn.OK\n",
    "    l_frame = batch_meta.frame_meta_list\n",
    "    while l_frame is not None:\n",
    "        try:\n",
    "            # Note that l_frame.data needs a cast to pyds.NvDsFrameMeta\n",
    "            # The casting is done by pyds.NvDsFrameMeta.cast()\n",
    "            # The casting also keeps ownership of the underlying memory\n",
    "            # in the C code, so the Python garbage collector will leave\n",
    "            # it alone.\n",
    "            frame_meta = pyds.NvDsFrameMeta.cast(l_frame.data)\n",
    "        except StopIteration:\n",
    "            continue\n",
    "        is_first_object = True\n",
    "\n",
    "        # Short example of attribute access for frame_meta:\n",
    "        # print(\"Frame Number is \", frame_meta.frame_num)\n",
    "        # print(\"Source id is \", frame_meta.source_id)\n",
    "        # print(\"Batch id is \", frame_meta.batch_id)\n",
    "        # print(\"Source Frame Width \", frame_meta.source_frame_width)\n",
    "        # print(\"Source Frame Height \", frame_meta.source_frame_height)\n",
    "        # print(\"Num object meta \", frame_meta.num_obj_meta)\n",
    "\n",
    "        frame_number = frame_meta.frame_num\n",
    "        l_obj = frame_meta.obj_meta_list\n",
    "        while l_obj is not None:\n",
    "            try:\n",
    "                obj_meta = pyds.NvDsObjectMeta.cast(l_obj.data)\n",
    "            except StopIteration:\n",
    "                continue\n",
    "\n",
    "            # Update the object text display\n",
    "            txt_params = obj_meta.text_params\n",
    "\n",
    "            # Set display_text. Any existing display_text string will be\n",
    "            # freed by the bindings module.\n",
    "            txt_params.display_text = pgie_classes_str[obj_meta.class_id]\n",
    "\n",
    "            obj_counter[obj_meta.class_id] += 1\n",
    "\n",
    "            # Font , font-color and font-size\n",
    "            txt_params.font_params.font_name = \"Serif\"\n",
    "            txt_params.font_params.font_size = 10\n",
    "            # set(red, green, blue, alpha); set to White\n",
    "            txt_params.font_params.font_color.set(1.0, 1.0, 1.0, 1.0)\n",
    "\n",
    "            # Text background color\n",
    "            txt_params.set_bg_clr = 1\n",
    "            # set(red, green, blue, alpha); set to Black\n",
    "            txt_params.text_bg_clr.set(0.0, 0.0, 0.0, 1.0)\n",
    "\n",
    "            # Ideally NVDS_EVENT_MSG_META should be attached to buffer by the\n",
    "            # component implementing detection / recognition logic.\n",
    "            # Here it demonstrates how to use / attach that meta data.\n",
    "            if is_first_object and (frame_number % 30) == 0:\n",
    "                # Frequency of messages to be send will be based on use case.\n",
    "                # Here message is being sent for first object every 30 frames.\n",
    "\n",
    "                user_event_meta = pyds.nvds_acquire_user_meta_from_pool(\n",
    "                    batch_meta)\n",
    "                if user_event_meta:\n",
    "                    # Allocating an NvDsEventMsgMeta instance and getting\n",
    "                    # reference to it. The underlying memory is not manged by\n",
    "                    # Python so that downstream plugins can access it. Otherwise\n",
    "                    # the garbage collector will free it when this probe exits.\n",
    "                    msg_meta = pyds.alloc_nvds_event_msg_meta(user_event_meta)\n",
    "                    msg_meta.bbox.top = obj_meta.rect_params.top\n",
    "                    msg_meta.bbox.left = obj_meta.rect_params.left\n",
    "                    msg_meta.bbox.width = obj_meta.rect_params.width\n",
    "                    msg_meta.bbox.height = obj_meta.rect_params.height\n",
    "                    msg_meta.frameId = frame_number\n",
    "                    msg_meta.trackingId = long_to_uint64(obj_meta.object_id)\n",
    "                    msg_meta.confidence = obj_meta.confidence\n",
    "                    msg_meta = generate_event_msg_meta(msg_meta, obj_meta.class_id)\n",
    "\n",
    "                    user_event_meta.user_meta_data = msg_meta\n",
    "                    user_event_meta.base_meta.meta_type = pyds.NvDsMetaType.NVDS_EVENT_MSG_META\n",
    "\n",
    "                    pyds.nvds_add_user_meta_to_frame(frame_meta,\n",
    "                                                     user_event_meta)\n",
    "                else:\n",
    "                    print(\"Error in attaching event meta to buffer\\n\")\n",
    "\n",
    "                is_first_object = False\n",
    "            try:\n",
    "                l_obj = l_obj.next\n",
    "            except StopIteration:\n",
    "                break\n",
    "        try:\n",
    "            l_frame = l_frame.next\n",
    "        except StopIteration:\n",
    "            break\n",
    "\n",
    "    print(\"Frame Number =\", frame_number, \"Vehicle Count =\",\n",
    "          obj_counter[PGIE_CLASS_ID_VEHICLE], \"Person Count =\",\n",
    "          obj_counter[PGIE_CLASS_ID_PERSON])\n",
    "    return Gst.PadProbeReturn.OK"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Initializing GStreamer"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "Gst.init(None)\n",
    "\n",
    "# Deprecated: following meta_copy_func and meta_free_func\n",
    "# have been moved to the binding as event_msg_meta_copy_func()\n",
    "# and event_msg_meta_release_func() respectively.\n",
    "# Hence, registering and unsetting these callbacks in not needed\n",
    "# anymore. Please extend the above functions as necessary instead.\n",
    "# # registering callbacks\n",
    "# pyds.register_user_copyfunc(meta_copy_func)\n",
    "# pyds.register_user_releasefunc(meta_free_func)\n",
    "\n",
    "print(\"Creating Pipeline \\n \")\n",
    "\n",
    "pipeline = Gst.Pipeline()\n",
    "\n",
    "if not pipeline:\n",
    "    sys.stderr.write(\" Unable to create Pipeline \\n\")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Creating different elements of the stream"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Creating a source element for reading from a file"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "print(\"Creating Source \\n \")\n",
    "source = Gst.ElementFactory.make(\"filesrc\", \"file-source\")\n",
    "if not source:\n",
    "    sys.stderr.write(\" Unable to create Source \\n\")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Creating a h264 parser as the input file is an elementary h264 stream"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "print(\"Creating H264Parser \\n\")\n",
    "h264parser = Gst.ElementFactory.make(\"h264parse\", \"h264-parser\")\n",
    "if not h264parser:\n",
    "    sys.stderr.write(\" Unable to create h264 parser \\n\")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Using nvdec_h264 for accelerated decoding on GPU"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "print(\"Creating Decoder \\n\")\n",
    "decoder = Gst.ElementFactory.make(\"nvv4l2decoder\", \"nvv4l2-decoder\")\n",
    "if not decoder:\n",
    "    sys.stderr.write(\" Unable to create Nvv4l2 Decoder \\n\")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Creating a nvstreammux instance to form batches for one or more sources"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "streammux = Gst.ElementFactory.make(\"nvstreammux\", \"Stream-muxer\")\n",
    "if not streammux:\n",
    "    sys.stderr.write(\" Unable to create NvStreamMux \\n\")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Setting up nvinfer to run inference on the decoders output\n",
    "### Note: Behaviour of inference is set through the config file "
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "pgie = Gst.ElementFactory.make(\"nvinfer\", \"primary-inference\")\n",
    "if not pgie:\n",
    "    sys.stderr.write(\" Unable to create pgie \\n\")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Using a converter to convert from NV12 to RGBA as required by nvosd"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "nvvidconv = Gst.ElementFactory.make(\"nvvideoconvert\", \"convertor\")\n",
    "if not nvvidconv:\n",
    "    sys.stderr.write(\" Unable to create nvvidconv \\n\")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Create OSD to draw on the converted RGBA buffer"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "nvosd = Gst.ElementFactory.make(\"nvdsosd\", \"onscreendisplay\")\n",
    "if not nvosd:\n",
    "    sys.stderr.write(\" Unable to create nvosd \\n\")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Create a message converter"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "msgconv=Gst.ElementFactory.make(\"nvmsgconv\", \"nvmsg-converter\")\n",
    "if not msgconv:\n",
    "    sys.stderr.write(\" Unable to create msgconv \\n\")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Create a message broker"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "msgbroker=Gst.ElementFactory.make(\"nvmsgbroker\", \"nvmsg-broker\")\n",
    "if not msgbroker:\n",
    "    sys.stderr.write(\" Unable to create msgbroker \\n\")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Create a tee "
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "tee=Gst.ElementFactory.make(\"tee\", \"nvsink-tee\")\n",
    "if not tee:\n",
    "    sys.stderr.write(\" Unable to create tee \\n\")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Creating queues for tee"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "queue1=Gst.ElementFactory.make(\"queue\", \"nvtee-que1\")\n",
    "if not queue1:\n",
    "    sys.stderr.write(\" Unable to create queue1 \\n\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "queue2=Gst.ElementFactory.make(\"queue\", \"nvtee-que2\")\n",
    "if not queue2:\n",
    "    sys.stderr.write(\" Unable to create queue2 \\n\")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Creating sink as required based on if display is required"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "if (no_display) :\n",
    "    print(\"Creating FakeSink \\n\")\n",
    "    sink = Gst.ElementFactory.make(\"fakesink\", \"fakesink\")\n",
    "    if not sink:\n",
    "        sys.stderr.write(\" Unable to create fakesink \\n\")\n",
    "else:\n",
    "    if is_aarch64():\n",
    "        print(\"Creating nv3dsink \\n\")\n",
    "        sink = Gst.ElementFactory.make(\"nv3dsink\", \"nv3d-sink\")\n",
    "        if not sink:\n",
    "            sys.stderr.write(\" Unable to create nv3dsink \\n\")\n",
    "    else:\n",
    "        print(\"Creating EGLSink \\n\")\n",
    "        sink = Gst.ElementFactory.make(\"nveglglessink\", \"nvvideo-renderer\")\n",
    "        if not sink:\n",
    "            sys.stderr.write(\" Unable to create egl sink \\n\")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Setting PGIE and Streammux properties\n",
    "### Suggested reading: dstest1_pgie_config.txt "
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "print(\"Playing file %s \" % input_file)\n",
    "source.set_property('location', input_file)\n",
    "streammux.set_property('width', 1920)\n",
    "streammux.set_property('height', 1080)\n",
    "streammux.set_property('batch-size', 1)\n",
    "streammux.set_property('batched-push-timeout', 33000)\n",
    "pgie.set_property('config-file-path', PGIE_CONFIG_FILE)\n",
    "msgconv.set_property('config', MSCONV_CONFIG_FILE)\n",
    "msgconv.set_property('payload-type', schema_type)\n",
    "msgbroker.set_property('proto-lib', proto_lib)\n",
    "msgbroker.set_property('conn-str', conn_str)\n",
    "if cfg_file is not None:\n",
    "    msgbroker.set_property('config', cfg_file)\n",
    "if topic is not None:\n",
    "    msgbroker.set_property('topic', topic)\n",
    "msgbroker.set_property('sync', False)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Adding elements to pipeline"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "print(\"Adding elements to Pipeline \\n\")\n",
    "pipeline.add(source)\n",
    "pipeline.add(h264parser)\n",
    "pipeline.add(decoder)\n",
    "pipeline.add(streammux)\n",
    "pipeline.add(pgie)\n",
    "pipeline.add(nvvidconv)\n",
    "pipeline.add(nvosd)\n",
    "pipeline.add(tee)\n",
    "pipeline.add(queue1)\n",
    "pipeline.add(queue2)\n",
    "pipeline.add(msgconv)\n",
    "pipeline.add(msgbroker)\n",
    "pipeline.add(sink)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Linking elements of pipline"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "print(\"Linking elements in the Pipeline \\n\")\n",
    "source.link(h264parser)\n",
    "h264parser.link(decoder)\n",
    "\n",
    "sinkpad = streammux.get_request_pad(\"sink_0\")\n",
    "if not sinkpad:\n",
    "    sys.stderr.write(\" Unable to get the sink pad of streammux \\n\")\n",
    "srcpad = decoder.get_static_pad(\"src\")\n",
    "if not srcpad:\n",
    "    sys.stderr.write(\" Unable to get source pad of decoder \\n\")\n",
    "srcpad.link(sinkpad)\n",
    "\n",
    "streammux.link(pgie)\n",
    "pgie.link(nvvidconv)\n",
    "nvvidconv.link(nvosd)\n",
    "nvosd.link(tee)\n",
    "queue1.link(msgconv)\n",
    "msgconv.link(msgbroker)\n",
    "queue2.link(sink)\n",
    "sink_pad = queue1.get_static_pad(\"sink\")\n",
    "tee_msg_pad = tee.get_request_pad('src_%u')\n",
    "tee_render_pad = tee.get_request_pad(\"src_%u\")\n",
    "if not tee_msg_pad or not tee_render_pad:\n",
    "    sys.stderr.write(\"Unable to get request pads\\n\")\n",
    "tee_msg_pad.link(sink_pad)\n",
    "sink_pad = queue2.get_static_pad(\"sink\")\n",
    "tee_render_pad.link(sink_pad)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Creating an event loop and feeding gstream bus messages to it"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    " loop = GLib.MainLoop()\n",
    "bus = pipeline.get_bus()\n",
    "bus.add_signal_watch()\n",
    "bus.connect(\"message\", bus_call, loop)\n",
    "\n",
    "osdsinkpad = nvosd.get_static_pad(\"sink\")\n",
    "if not osdsinkpad:\n",
    "    sys.stderr.write(\" Unable to get sink pad of nvosd \\n\")\n",
    "    \n",
    "osdsinkpad.add_probe(Gst.PadProbeType.BUFFER, osd_sink_pad_buffer_probe, 0)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Starting the pipeline"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "scrolled": false
   },
   "outputs": [],
   "source": [
    "print(\"Starting pipeline \\n\")\n",
    "\n",
    "# start play back and listed to events\n",
    "pipeline.set_state(Gst.State.PLAYING)\n",
    "try:\n",
    "    loop.run()\n",
    "except:\n",
    "    pass"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Cleaning up post successful run"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "pipeline.set_state(Gst.State.NULL)"
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3.9.5 64-bit",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.9.5"
  },
  "vscode": {
   "interpreter": {
    "hash": "f9f85f796d01129d0dd105a088854619f454435301f6ffec2fea96ecbd9be4ac"
   }
  }
 },
 "nbformat": 4,
 "nbformat_minor": 4
}
